package com.executor.gateway.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


/**
 * @Auther: miaoguoxin
 * @Date: 2019/4/17 0017 09:47
 * @Description:
 */
@Configuration
@Slf4j
public class RabbitConfig {
    @Value("${mq.exchange}")
    private String payExchange;

    @Value("${mq.ingate.queue}")
    private String ingateQueue;
    @Autowired
    private RabbitProperties rabbitProperties;
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(rabbitProperties.getHost() + ":" + rabbitProperties.getPort());
        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
        /** 如果要进行消息回调，则这里必须要设置为true */
        connectionFactory.setPublisherConfirms(rabbitProperties.isPublisherConfirms());
        connectionFactory.setPublisherReturns(rabbitProperties.isPublisherReturns());
        return connectionFactory;
    }



    @Bean
    /** 因为要设置回调类，所以应是prototype类型，如果是singleton类型，则回调类为最后一次设置 */
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        //启动事务后无法进行confirm
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息已确认 cause:{} - {}", cause, correlationData.toString());
            } else {
                log.info("消息未确认 cause:{} - {}", cause, correlationData.toString());
            }
        });
        rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> log.info("消息被退回 {}" , message.toString()));
        return rabbitTemplate;
    }


    @Bean
    public DirectExchange payExchange() {
        return new DirectExchange(payExchange,true,false);
    }

    @Bean
    public Queue ingateQueue() {
        return new Queue(ingateQueue,true);
    }

    @Bean
    public Binding ingateQueueBinding() {
        return BindingBuilder.bind(ingateQueue()).to(payExchange()).withQueueName();
    }

    /**
     * 配置启用rabbitmq事务
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}
